#!/usr/bin/python

DEFAULT_IMAGE='ceph/daemon-base:latest-master-devel'  # FIXME when octopus is ready!!!
DATA_DIR='/var/lib/ceph'
LOG_DIR='/var/log/ceph'
LOGROTATE_DIR='/etc/logrotate.d'
UNIT_DIR='/etc/systemd/system'
LOG_DIR_MODE=0o770
DATA_DIR_MODE=0o700
CONTAINER_PREFERENCE = ['podman', 'docker']  # prefer podman to docker
CUSTOM_PS1=r'[ceph: \u@\h \W]\$ '

"""
You can invoke ceph-daemon in two ways:

1. The normal way, at the command line.

2. By piping the script to the python3 binary.  In this latter case, you should
   prepend one or more lines to the beginning of the script.

   For arguments,

       injected_argv = [...]

   e.g.,

       injected_argv = ['ls']

   For reading stdin from the '--config-and-json -' argument,

       injected_stdin = '...'
"""

import argparse
try:
    from ConfigParser import ConfigParser   # py2
except ImportError:
    from configparser import ConfigParser   # py3
import fcntl
try:
    from StringIO import StringIO           # py2
except ImportError:
    from io import StringIO                 # py3
import json
import logging
import os
import random
import select
import shutil
import socket
import string
import subprocess
import sys
import tempfile
import time
try:
    from typing import Dict, List, Tuple, Optional, Union
except ImportError:
    pass
import uuid

from distutils.spawn import find_executable
from functools import wraps
from glob import iglob


container_path = None

class Error(Exception):
    pass

##################################
# Popen wrappers, lifted from ceph-volume

def call(command, desc=None, verbose=False, **kwargs):
    """
    Wrap subprocess.Popen to

    - log stdout/stderr to a logger,
    - decode utf-8
    - cleanly return out, err, returncode

    If verbose=True, log at info (instead of debug) level.

    :param verbose_on_failure: On a non-zero exit status, it will forcefully set
                               logging ON for the terminal
    """
    if not desc:
        desc = command[0]
    verbose_on_failure = kwargs.pop('verbose_on_failure', True)

    logger.debug("Running command: %s" % ' '.join(command))
    process = subprocess.Popen(
        command,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        close_fds=True,
        **kwargs
    )
    # get current p.stdout flags, add O_NONBLOCK
    stdout_flags = fcntl.fcntl(process.stdout, fcntl.F_GETFL)
    stderr_flags = fcntl.fcntl(process.stderr, fcntl.F_GETFL)
    fcntl.fcntl(process.stdout, fcntl.F_SETFL, stdout_flags | os.O_NONBLOCK)
    fcntl.fcntl(process.stderr, fcntl.F_SETFL, stderr_flags | os.O_NONBLOCK)

    out = ''
    err = ''
    reads = None
    stop = False
    out_buffer = ''   # partial line (no newline yet)
    err_buffer = ''   # partial line (no newline yet)
    while not stop:
        if reads and process.poll() is not None:
            # we want to stop, but first read off anything remaining
            # on stdout/stderr
            stop = True
        else:
            reads, _, _ = select.select(
                [process.stdout.fileno(), process.stderr.fileno()],
                [], []
            )
        for fd in reads:
            try:
                message = os.read(fd, 1024)
                if not isinstance(message, str):
                    message = message.decode('utf-8')
                if fd == process.stdout.fileno():
                    out += message
                    message = out_buffer + message
                    lines = message.split('\n')
                    out_buffer = lines.pop()
                    for line in lines:
                        if verbose:
                            logger.info(desc + ':stdout ' + line)
                        else:
                            logger.debug(desc + ':stdout ' + line)
                elif fd == process.stderr.fileno():
                    err += message
                    message = err_buffer + message
                    lines = message.split('\n')
                    err_buffer = lines.pop()
                    for line in lines:
                        if verbose:
                            logger.info(desc + ':stderr ' + line)
                        else:
                            logger.debug(desc + ':stderr ' + line)
                else:
                    assert False
            except (IOError, OSError):
                pass

    returncode = process.wait()

    if out_buffer != '':
        if verbose:
            logger.info(desc + ':stdout ' + out_buffer)
        else:
            logger.debug(desc + ':stdout ' + out_buffer)
    if err_buffer != '':
        if verbose:
            logger.info(desc + ':stderr ' + err_buffer)
        else:
            logger.debug(desc + ':stderr ' + err_buffer)

    if returncode != 0 and verbose_on_failure and not verbose:
        # dump stdout + stderr
        logger.info('Non-zero exit code %d from %s' % (returncode, ' '.join(command)))
        for line in out.splitlines():
            logger.info(desc + ':stdout ' + line)
        for line in err.splitlines():
            logger.info(desc + ':stderr ' + line)

    return out, err, returncode

def call_throws(command, **kwargs):
    out, err, ret = call(command, **kwargs)
    if ret:
        raise RuntimeError('Failed command: %s' % ' '.join(command))
    return out, err, ret

##################################

def pathify(p):
    # type: (str) -> str
    if not p.startswith('/'):
        return os.path.join(os.getcwd(), p)
    return p

def get_hostname():
    # type: () -> str
    return socket.gethostname()

def get_fqdn():
    # type: () -> str
    return socket.getfqdn() or socket.gethostname()

def generate_password():
    # type: () -> str
    return ''.join(random.choice(string.ascii_lowercase + string.digits)
                   for i in range(10))

def make_fsid():
    # type: () -> str
    return str(uuid.uuid1())

def is_fsid(s):
    # type: (str) -> bool
    try:
        uuid.UUID(s)
    except ValueError:
        return False
    return True

def infer_fsid(func):
    """
    If we only find a single fsid in /var/lib/ceph/*, use that
    """
    @wraps(func)
    def _infer_fsid():
        if args.fsid:
            logger.debug('Using specified fsid: %s' % args.fsid)
            return func()

        fsid_list = []
        if os.path.exists(args.data_dir):
            for i in os.listdir(args.data_dir):
                if is_fsid(i):
                    fsid_list.append(i)
        logger.debug('Found fsids %s' % str(fsid_list))

        if not fsid_list:
            # TODO: raise?
            return func()

        if len(fsid_list) > 1:
            raise Error('cannot infer fsid, must specify --fsid')

        logger.info('Inferring fsid %s' % fsid_list[0])
        args.fsid = fsid_list[0]
        return func()
    return _infer_fsid

def makedirs(dir, uid, gid, mode):
    # type: (str, int, int, int) -> None
    if not os.path.exists(dir):
        os.makedirs(dir, mode=mode)
    else:
        os.chmod(dir, mode)
    os.chown(dir, uid, gid)
    os.chmod(dir, mode)   # the above is masked by umask...

def get_data_dir(fsid, t, n):
    # type: (str, str, Union[int, str]) -> str
    return os.path.join(args.data_dir, fsid, '%s.%s' % (t, n))

def get_log_dir(fsid):
    # type: (str) -> str
    return os.path.join(args.log_dir, fsid)

def make_data_dir_base(fsid, uid, gid):
    # type: (str, int, int) -> str
    data_dir_base = os.path.join(args.data_dir, fsid)
    makedirs(data_dir_base, uid, gid, DATA_DIR_MODE)
    makedirs(os.path.join(data_dir_base, 'crash'), uid, gid, DATA_DIR_MODE)
    makedirs(os.path.join(data_dir_base, 'crash', 'posted'), uid, gid,
             DATA_DIR_MODE)
    return data_dir_base

def make_data_dir(fsid, daemon_type, daemon_id, uid=None, gid=None):
    # type: (str, str, Union[int, str], int, int) -> str
    if not uid or not gid:
        (uid, gid) = extract_uid_gid()
    make_data_dir_base(fsid, uid, gid)
    data_dir = get_data_dir(fsid, daemon_type, daemon_id)
    makedirs(data_dir, uid, gid, DATA_DIR_MODE)
    return data_dir

def make_log_dir(fsid, uid=None, gid=None):
    # type: (str, int, int) -> str
    if not uid or not gid:
        (uid, gid) = extract_uid_gid()
    log_dir = get_log_dir(fsid)
    makedirs(log_dir, uid, gid, LOG_DIR_MODE)
    return log_dir

def copy_file(src, dst, uid=None, gid=None):
    # type: (str, str, int, int) -> str
    """
    Copy a file from src to dst
    """
    if not uid or not gid:
        (uid, gid) = extract_uid_gid()

    if os.path.isdir(dst):
        dst = os.path.join(dst, os.path.basename(src))

    logger.debug('Copy \'%s\' -> \'%s\'' % (src, dst))
    shutil.copyfile(src, dst)
    os.chown(dst, uid, gid)

    return dst

def move_file(src, dst, uid=None, gid=None):
    # type: (str, str, int, int) -> str
    """
    Move a file from src to dst
    """
    if not uid or not gid:
        (uid, gid) = extract_uid_gid()

    if os.path.isdir(dst):
        dst = os.path.join(dst, os.path.basename(src))

    if os.path.islink(src):
        # shutil.move() in python2 does not handle symlinks correctly
        src_rl = os.readlink(src)
        logger.debug('symlink \'%s\' -> \'%s\'' % (dst, src_rl))
        os.symlink(src_rl, dst)
        os.unlink(src)
    else:
        logger.debug('Move \'%s\' -> \'%s\'' % (src, dst))
        shutil.move(src, dst)
    os.chown(dst, uid, gid)

    return dst

def find_program(filename):
    # type: (str) -> str
    name = find_executable(filename)
    if name is None:
        raise ValueError('%s not found' % filename)
    return name

def get_unit_name(fsid, daemon_type, daemon_id=None):
    # type (str, str, Optional[Union[int, str]]) -> str
    # accept either name or type + id
    if daemon_id is not None:
        return 'ceph-%s@%s.%s' % (fsid, daemon_type, daemon_id)
    else:
        return 'ceph-%s@%s' % (fsid, daemon_type)

def check_unit(unit_name):
    # type: (str) -> Tuple[bool, str]
    # NOTE: we ignore the exit code here because systemctl outputs
    # various exit codes based on the state of the service, but the
    # string result is more explicit (and sufficient).
    enabled = False
    try:
        out, err, code = call(['systemctl', 'is-enabled', unit_name],
                              verbose_on_failure=False)
        if code == 0:
            enabled = True
    except Exception as e:
        logger.warning('unable to run systemctl: %s' % e)
        enabled = False

    state = 'unknown'
    try:
        out, err, code = call(['systemctl', 'is-active', unit_name],
                              verbose_on_failure=False)
        out = out.strip()
        if out in ['active']:
            state = 'running'
        elif out in ['inactive']:
            state = 'stopped'
        elif out in ['failed', 'auto-restart']:
            state = 'error'
        else:
            state = 'unknown'
    except Exception as e:
        logger.warning('unable to run systemctl: %s' % e)
        state = 'unknown'
    return (enabled, state)

def get_legacy_config_fsid(cluster, legacy_dir=None):
    # type: (str, str) -> Optional[str]
    config_file = '/etc/ceph/%s.conf' % cluster
    if legacy_dir is not None:
        config_file = os.path.abspath(legacy_dir + config_file)

    config = ConfigParser()
    config.read(config_file)

    if config.has_section('global') and config.has_option('global', 'fsid'):
        return config.get('global', 'fsid')
    return None

def get_legacy_daemon_fsid(cluster, daemon_type, daemon_id, legacy_dir=None):
    # type: (str, str, Union[int, str], str) -> Optional[str]
    fsid = None
    if daemon_type == 'osd':
        try:
            fsid_file = os.path.join(args.data_dir,
                                     daemon_type,
                                     'ceph-%s' % daemon_id,
                                     'ceph_fsid')
            if legacy_dir is not None:
                fsid_file = os.path.abspath(legacy_dir + fsid_file)
            with open(fsid_file, 'r') as f:
                fsid = f.read().strip()
        except IOError:
            pass
    if not fsid:
        fsid = get_legacy_config_fsid(cluster, legacy_dir=legacy_dir)
    return fsid

def get_daemon_args(fsid, daemon_type, daemon_id):
    # type: (str, str, Union[int, str]) -> List[str]
    r = [
        '--default-log-to-file=false',
        '--default-log-to-stderr=true',
        ]
    r += ['--setuser', 'ceph']
    r += ['--setgroup', 'ceph']
    return r

def create_daemon_dirs(fsid, daemon_type, daemon_id, uid, gid,
                       config=None, keyring=None):
    # type: (str, str, Union[int, str], int, int, str, str) ->  None
    data_dir = make_data_dir(fsid, daemon_type, daemon_id)
    make_log_dir(fsid)

    if config:
        with open(data_dir + '/config', 'w') as f:
            os.fchown(f.fileno(), uid, gid)
            os.fchmod(f.fileno(), 0o600)
            f.write(config)
    if keyring:
        with open(data_dir + '/keyring', 'w') as f:
            os.fchmod(f.fileno(), 0o600)
            os.fchown(f.fileno(), uid, gid)
            f.write(keyring)

def get_config_and_keyring():
    # type: () -> Tuple[str, str]
    if args.config_and_keyring:
        if args.config_and_keyring == '-':
            try:
                j = injected_stdin # type: ignore
            except NameError:
                j = sys.stdin.read()
        else:
            with open(args.config_and_keyring, 'r') as f:
                j = f.read()
        d = json.loads(j)
        config = d.get('config')
        keyring = d.get('keyring')
    else:
        if args.key:
            keyring = '[%s]\n\tkey = %s\n' % (args.name, args.key)
        elif args.keyring:
            with open(args.keyring, 'r') as f:
                keyring = f.read()
        else:
            raise Error('no keyring provided')
        with open(args.config, 'r') as f:
            config = f.read()
    return (config, keyring)

def get_config_and_both_keyrings():
    # type: () -> Tuple[str, str, Optional[str]]
    if args.config_and_keyrings:
        if args.config_and_keyrings == '-':
            try:
                j = injected_stdin # type: ignore
            except NameError:
                j = sys.stdin.read()
        else:
            with open(args.config_and_keyrings, 'r') as f:
                j = f.read()
        d = json.loads(j)
        return (d.get('config'), d.get('keyring'), d.get('crash_keyring'))
    else:
        if args.key:
            keyring = '[%s]\n\tkey = %s\n' % (args.name, args.key)
        elif args.keyring:
            with open(args.keyring, 'r') as f:
                keyring = f.read()
        else:
            raise Error('no keyring provided')
        crash_keyring = None
        if args.crash_keyring:
            with open(args.crash_keyring, 'r') as f:
                crash_keyring = f.read()
        with open(args.config, 'r') as f:
            config = f.read()
        return (config, keyring, crash_keyring)

def get_container_mounts(fsid, daemon_type, daemon_id):
    # type: (str, str, Union[int, str, None]) -> Dict[str, str]
    mounts = {}
    if fsid:
        run_path = os.path.join('/var/run/ceph', fsid);
        if os.path.exists(run_path):
            mounts[run_path] = '/var/run/ceph:z'
        log_dir = get_log_dir(fsid)
        mounts[log_dir] = '/var/log/ceph:z'
        crash_dir = '/var/lib/ceph/%s/crash' % fsid
        if os.path.exists(crash_dir):
            mounts[crash_dir] = '/var/lib/ceph/crash:z'

    if daemon_id:
        data_dir = get_data_dir(fsid, daemon_type, daemon_id)
        if daemon_type == 'rgw':
            cdata_dir = '/var/lib/ceph/radosgw/ceph-rgw.%s' % (daemon_id)
        else:
            cdata_dir = '/var/lib/ceph/%s/ceph-%s' % (daemon_type, daemon_id)
        mounts[data_dir] = cdata_dir + ':z'
        mounts[data_dir + '/config'] = '/etc/ceph/ceph.conf:z'
        if daemon_type == 'rbd-mirror':
            # rbd-mirror does not search for its keyring in a data directory
            mounts[data_dir + '/keyring'] = '/etc/ceph/ceph.client.rbd-mirror.%s.keyring' % daemon_id

    if daemon_type in ['mon', 'osd']:
        mounts['/dev'] = '/dev'  # FIXME: narrow this down?
        mounts['/run/udev'] = '/run/udev'
    if daemon_type == 'osd':
        mounts['/sys'] = '/sys'  # for numa.cc, pick_address, cgroups, ...
        mounts['/run/lvm'] = '/run/lvm'
        mounts['/run/lock/lvm'] = '/run/lock/lvm'

    return mounts

def get_container(fsid, daemon_type, daemon_id, privileged=False,
                  container_args=[]):
    # type: (str, str, Union[int, str], bool, List[str]) -> CephContainer
    if daemon_type in ['mon', 'osd'] or privileged:
        # mon and osd need privileged in order for libudev to query devices
        container_args += ['--privileged']
    if daemon_type == 'rgw':
        entrypoint = '/usr/bin/radosgw'
        name = 'client.rgw.%s' % daemon_id
    elif daemon_type == 'rbd-mirror':
        entrypoint = '/usr/bin/rbd-mirror'
        name = 'client.rbd-mirror.%s' % daemon_id
    else:
        entrypoint = '/usr/bin/ceph-' + daemon_type
        name = '%s.%s' % (daemon_type, daemon_id)
    return CephContainer(
        image=args.image,
        entrypoint=entrypoint,
        args=[
            '-n', name,
            '-f', # foreground
        ] + get_daemon_args(fsid, daemon_type, daemon_id),
        container_args=container_args,
        volume_mounts=get_container_mounts(fsid, daemon_type, daemon_id),
        cname='ceph-%s-%s.%s' % (fsid, daemon_type, daemon_id),
    )

def extract_uid_gid():
    # type: () -> Tuple[int, int]
    out = CephContainer(
        image=args.image,
        entrypoint='/usr/bin/grep',
        args=['^ceph:', '/etc/passwd'],
    ).run()
    (uid, gid) = out.split(':')[2:4]
    return (int(uid), int(gid))

def deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid,
                  config, keyring):
    # type: (str, str, Union[int, str], CephContainer, int, int, str, str) -> None
    if daemon_type == 'mon' and not os.path.exists(get_data_dir(fsid, 'mon',
                                                                daemon_id)):
        # tmp keyring file
        tmp_keyring = tempfile.NamedTemporaryFile(mode='w')
        os.fchmod(tmp_keyring.fileno(), 0o600)
        os.fchown(tmp_keyring.fileno(), uid, gid)
        tmp_keyring.write(keyring)
        tmp_keyring.flush()

        # tmp config file
        tmp_config = tempfile.NamedTemporaryFile(mode='w')
        os.fchmod(tmp_config.fileno(), 0o600)
        os.fchown(tmp_config.fileno(), uid, gid)
        tmp_config.write(config)
        tmp_config.flush()

        # --mkfs
        create_daemon_dirs(fsid, daemon_type, daemon_id, uid, gid)
        mon_dir = get_data_dir(fsid, 'mon', daemon_id)
        log_dir = get_log_dir(fsid)
        out = CephContainer(
            image=args.image,
            entrypoint='/usr/bin/ceph-mon',
            args=['--mkfs',
                  '-i', str(daemon_id),
                  '--fsid', fsid,
                  '-c', '/tmp/config',
                  '--keyring', '/tmp/keyring',
            ] + get_daemon_args(fsid, 'mon', daemon_id),
            volume_mounts={
                log_dir: '/var/log/ceph:z',
                mon_dir: '/var/lib/ceph/mon/ceph-%s:z' % (daemon_id),
                tmp_keyring.name: '/tmp/keyring:z',
                tmp_config.name: '/tmp/config:z',
            },
        ).run()

        # write conf
        with open(mon_dir + '/config', 'w') as f:
            os.fchown(f.fileno(), uid, gid)
            os.fchmod(f.fileno(), 0o600)
            f.write(config)
    else:
        # dirs, conf, keyring
        create_daemon_dirs(
            fsid, daemon_type, daemon_id,
            uid, gid,
            config, keyring)

    if daemon_type == 'osd' and args.osd_fsid:
        pc = CephContainer(
            image=args.image,
            entrypoint='/usr/sbin/ceph-volume',
            args=[
                'lvm', 'activate',
                str(daemon_id), args.osd_fsid,
                '--no-systemd'
            ],
            container_args=['--privileged'],
            volume_mounts=get_container_mounts(fsid, daemon_type, daemon_id),
            cname='ceph-%s-activate-%s.%s' % (fsid, daemon_type, daemon_id),
        )
        pc.run()

    deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c)

def deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c,
                        enable=True, start=True):
    # type: (str, int, int, str, Union[int, str], CephContainer, bool, bool) -> None
    # cmd
    data_dir = get_data_dir(fsid, daemon_type, daemon_id)
    with open(data_dir + '/cmd', 'w') as f:
        f.write('#!/bin/sh\n' + ' '.join(c.run_cmd()) + '\n')
        os.fchmod(f.fileno(), 0o700)

    # systemd
    install_base_units(fsid)
    unit = get_unit_file(fsid, uid, gid)
    unit_file = 'ceph-%s@.service' % (fsid)
    with open(args.unit_dir + '/' + unit_file + '.new', 'w') as f:
        f.write(unit)
        os.rename(args.unit_dir + '/' + unit_file + '.new',
                  args.unit_dir + '/' + unit_file)
    call_throws(['systemctl', 'daemon-reload'])

    unit_name = get_unit_name(fsid, daemon_type, daemon_id)
    call(['systemctl', 'stop', unit_name],
         verbose_on_failure=False)
    call(['systemctl', 'reset-failed', unit_name],
         verbose_on_failure=False)
    if enable:
        call_throws(['systemctl', 'enable', unit_name])
    if start:
        call_throws(['systemctl', 'start', unit_name])

def install_base_units(fsid):
    # type: (str) -> None
    """
    Set up ceph.target and ceph-$fsid.target units.
    """
    # global unit
    existed = os.path.exists(args.unit_dir + '/ceph.target')
    with open(args.unit_dir + '/ceph.target.new', 'w') as f:
        f.write('[Unit]\n'
                'Description=All Ceph clusters and services\n'
                '\n'
                '[Install]\n'
                'WantedBy=multi-user.target\n')
        os.rename(args.unit_dir + '/ceph.target.new',
                  args.unit_dir + '/ceph.target')
    if not existed:
        # we disable before enable in case a different ceph.target
        # (from the traditional package) is present; while newer
        # systemd is smart enough to disable the old
        # (/lib/systemd/...) and enable the new (/etc/systemd/...),
        # some older versions of systemd error out with EEXIST.
        call_throws(['systemctl', 'disable', 'ceph.target'])
        call_throws(['systemctl', 'enable', 'ceph.target'])
        call_throws(['systemctl', 'start', 'ceph.target'])

    # cluster unit
    existed = os.path.exists(args.unit_dir + '/ceph-%s.target' % fsid)
    with open(args.unit_dir + '/ceph-%s.target.new' % fsid, 'w') as f:
        f.write('[Unit]\n'
                'Description=Ceph cluster {fsid}\n'
                'PartOf=ceph.target\n'
                'Before=ceph.target\n'
                '\n'
                '[Install]\n'
                'WantedBy=multi-user.target ceph.target\n'.format(
                    fsid=fsid)
        )
        os.rename(args.unit_dir + '/ceph-%s.target.new' % fsid,
                  args.unit_dir + '/ceph-%s.target' % fsid)
    if not existed:
        call_throws(['systemctl', 'enable', 'ceph-%s.target' % fsid])
        call_throws(['systemctl', 'start', 'ceph-%s.target' % fsid])

    # logrotate for the cluster
    with open(args.logrotate_dir + '/ceph-%s' % fsid, 'w') as f:
        """
        This is a bit sloppy in that the killall/pkill will touch all ceph daemons
        in all containers, but I don't see an elegant way to send SIGHUP *just* to
        the daemons for this cluster.  (1) systemd kill -s will get the signal to
        podman, but podman will exit.  (2) podman kill will get the signal to the
        first child (bash), but that isn't the ceph daemon.  This is simpler and
        should be harmless.
        """
        f.write("""# created by ceph-daemon
/var/log/ceph/%s/*.log {
    rotate 7
    daily
    compress
    sharedscripts
    postrotate
        killall -q -1 ceph-mon ceph-mgr ceph-mds ceph-osd ceph-fuse radosgw || pkill -1 -x "ceph-mon|ceph-mgr|ceph-mds|ceph-osd|ceph-fuse|radosgw" || true
    endscript
    missingok
    notifempty
    su root root
}
""" % fsid)

def deploy_crash(fsid, uid, gid, config, keyring):
    # type: (str, int, int, str, str) -> None
    crash_dir = os.path.join(args.data_dir, fsid, 'crash')
    makedirs(crash_dir, uid, gid, DATA_DIR_MODE)

    with open(os.path.join(crash_dir, 'keyring'), 'w') as f:
        os.fchmod(f.fileno(), 0o600)
        os.fchown(f.fileno(), uid, gid)
        f.write(keyring)
    with open(os.path.join(crash_dir, 'config'), 'w') as f:
        os.fchmod(f.fileno(), 0o600)
        os.fchown(f.fileno(), uid, gid)
        f.write(config)

    # ceph-crash unit
    mounts = {
        crash_dir: '/var/lib/ceph/crash:z',
        os.path.join(crash_dir, 'config'): '/etc/ceph/ceph.conf:z',
        os.path.join(crash_dir, 'keyring'): '/etc/ceph/ceph.keyring:z',
    }
    c = CephContainer(
        image=args.image,
        entrypoint='/usr/bin/ceph-crash',
        args=['-n', 'client.crash.%s' % get_hostname()],
        volume_mounts=mounts,
        cname='ceph-%s-crash' % (fsid),
    )
    unit_name = 'ceph-%s-crash.service' % fsid
    with open(os.path.join(args.unit_dir, unit_name + '.new'), 'w') as f:
        f.write('[Unit]\n'
                'Description=Ceph cluster {fsid} crash dump collector\n'
                'PartOf=ceph-{fsid}.target\n'
                'Before=ceph-{fsid}.target\n'
                '\n'
                '[Service]\n'
                'Type=simple\n'
                'ExecStart={cmd}\n'
                'ExecStop=-{container_path} stop ceph-{fsid}-crash\n'
                'Restart=always\n'
                'RestartSec=10\n'
                'StartLimitInterval=10min\n'
                'StartLimitBurst=10\n'
                '\n'
                '[Install]\n'
                'WantedBy=ceph-{fsid}.target\n'.format(
                    container_path=container_path,
                    fsid=fsid,
                    cmd=' '.join(c.run_cmd()))
        )
        os.rename(os.path.join(args.unit_dir, unit_name + '.new'),
                  os.path.join(args.unit_dir, unit_name))
    subprocess.check_output(['systemctl', 'enable', unit_name])
    subprocess.check_output(['systemctl', 'start', unit_name])

def get_unit_file(fsid, uid, gid):
    # type: (str, int, int) -> str
    install_path = find_program('install')
    u = """[Unit]
Description=Ceph daemon for {fsid}

# According to:
#   http://www.freedesktop.org/wiki/Software/systemd/NetworkTarget
# these can be removed once ceph-mon will dynamically change network
# configuration.
After=network-online.target local-fs.target time-sync.target
Wants=network-online.target local-fs.target time-sync.target

PartOf=ceph-{fsid}.target
Before=ceph-{fsid}.target

[Service]
LimitNOFILE=1048576
LimitNPROC=1048576
EnvironmentFile=-/etc/environment
ExecStartPre=-{container_path} rm ceph-{fsid}-%i
ExecStartPre=-{install_path} -d -m0770 -o {uid} -g {gid} /var/run/ceph/{fsid}
ExecStart=/bin/bash {data_dir}/{fsid}/%i/cmd
ExecStop=-{container_path} stop ceph-{fsid}-%i
Restart=on-failure
RestartSec=10s
TimeoutStartSec=120
TimeoutStopSec=15
StartLimitInterval=30min
StartLimitBurst=5

[Install]
WantedBy=ceph-{fsid}.target
""".format(
    container_path=container_path,
    install_path=install_path,
    fsid=fsid,
    uid=uid,
    gid=gid,
    data_dir=args.data_dir)
    return u

##################################

class CephContainer:
    def __init__(self,
                 image,
                 entrypoint,
                 args=[],
                 volume_mounts={},
                 cname='',
                 container_args=[]):
        # type: (str, str, List[str], Dict[str, str], str, List[str]) -> None
        self.image = image
        self.entrypoint = entrypoint
        self.args = args
        self.volume_mounts = volume_mounts
        self.cname = cname
        self.container_args = container_args

    def run_cmd(self):
        # type: () -> List[str]
        vols = [] # type: List[str]
        envs = [] # type: List[str]
        cname = [] # type: List[str]
        vols = sum(
            [['-v', '%s:%s' % (host_dir, container_dir)]
             for host_dir, container_dir in self.volume_mounts.items()], [])
        envs = [
            '-e', 'CONTAINER_IMAGE=%s' % self.image,
            '-e', 'NODE_NAME=%s' % get_hostname(),
        ]
        cname = ['--name', self.cname] if self.cname else []
        return [
            str(container_path),
            'run',
            '--rm',
            '--net=host',
        ] + self.container_args + \
        cname + envs + \
        vols + \
        [
            '--entrypoint', self.entrypoint,
            self.image
        ] + self.args # type: ignore

    def shell_cmd(self, cmd):
        # type: (List[str]) -> List[str]
        vols = [] # type: List[str]
        vols = sum(
            [['-v', '%s:%s' % (host_dir, container_dir)]
             for host_dir, container_dir in self.volume_mounts.items()], [])
        envs = [
            '-e', 'CONTAINER_IMAGE=%s' % self.image,
            '-e', 'NODE_NAME=%s' % get_hostname(),
        ]
        cmd_args = [] # type: List[str]
        if cmd:
            cmd_args = ['-c'] + cmd
        return [
            str(container_path),
            'run',
            '--net=host',
        ] + self.container_args + envs + vols + [
            '--entrypoint', cmd[0],
            self.image
        ] + cmd[1:]

    def exec_cmd(self, cmd):
        # type: (List[str]) -> List[str]
        return [
            str(container_path),
            'exec',
        ] + self.container_args + [
            self.cname,
        ] + cmd

    def run(self):
        # type: () -> str
        logger.debug(self.run_cmd())
        out, _, _ = call_throws(self.run_cmd(), desc=self.entrypoint)
        return out


##################################

def command_version():
    # type: () -> int
    out = CephContainer(args.image, 'ceph', ['--version']).run()
    print(out.strip())
    return 0

##################################

def command_bootstrap():
    # type: () -> int

    # verify output files
    if not args.allow_overwrite:
        for f in [args.output_config, args.output_keyring, args.output_pub_ssh_key]:
            if os.path.exists(f):
                raise Error('%s already exists; delete or pass '
                              '--allow-overwrite to overwrite' % f)

    # initial vars
    fsid = args.fsid or make_fsid()
    hostname = get_hostname()
    mon_id = args.mon_id or hostname
    mgr_id = args.mgr_id or hostname
    logging.info('Cluster fsid: %s' % fsid)

    # config
    cp = ConfigParser()
    if args.config:
        cp.read(args.config)
    if args.mon_ip:
        addr_arg = '[v2:%s:3300,v1:%s:6789]' % (args.mon_ip, args.mon_ip)
        mon_ip = args.mon_ip
    elif args.mon_addrv:
        addr_arg = args.mon_addrv
        mon_ip = args.mon_addrv.split(':')[1]
    else:
        raise Error('must specify --mon-ip or --mon-addrv')
    if not cp.has_section('global'):
        cp.add_section('global')
    cp.set('global', 'fsid', fsid);
    cp.set('global', 'mon host', addr_arg)
    cp.set('global', 'container_image', args.image)
    cpf = StringIO()
    cp.write(cpf)
    config = cpf.getvalue()

    if not args.skip_ping_check:
        logger.info('Verifying we can ping mon IP %s...' % mon_ip)
        _, _, ret = call(['timeout', '5', 'ping', mon_ip, '-c', '1'], 'ping')
        if ret:
            raise Error('failed to ping %s' % mon_ip)

    if not args.skip_pull:
        logger.info('Pulling latest %s container...' % args.image)
        call_throws([container_path, 'pull', args.image])

    logger.info('Extracting ceph user uid/gid from container image...')
    (uid, gid) = extract_uid_gid()

    # create some initial keys
    logger.info('Creating initial keys...')
    mon_key = CephContainer(
        image=args.image,
        entrypoint='/usr/bin/ceph-authtool',
        args=['--gen-print-key'],
    ).run().strip()
    admin_key = CephContainer(
        image=args.image,
        entrypoint='/usr/bin/ceph-authtool',
        args=['--gen-print-key'],
    ).run().strip()
    mgr_key = CephContainer(
        image=args.image,
        entrypoint='/usr/bin/ceph-authtool',
        args=['--gen-print-key'],
    ).run().strip()
    crash_key = CephContainer(
        image=args.image,
        entrypoint='/usr/bin/ceph-authtool',
        args=['--gen-print-key'],
    ).run().strip()

    keyring = ('[mon.]\n'
               '\tkey = %s\n'
               '\tcaps mon = allow *\n'
               '[client.admin]\n'
               '\tkey = %s\n'
               '\tcaps mon = allow *\n'
               '\tcaps mds = allow *\n'
               '\tcaps mgr = allow *\n'
               '\tcaps osd = allow *\n'
               '[mgr.%s]\n'
               '\tkey = %s\n'
               '\tcaps mon = profile mgr\n'
               '\tcaps mds = allow *\n'
               '\tcaps osd = allow *\n'
               '[client.crash.%s]\n'
               '\tkey = %s\n'
               '\tcaps mon = profile crash\n'
               '\tcaps mgr = profile crash\n'
               % (mon_key, admin_key, mgr_id, mgr_key, hostname, crash_key))

    # tmp keyring file
    tmp_bootstrap_keyring = tempfile.NamedTemporaryFile(mode='w')
    os.fchmod(tmp_bootstrap_keyring.fileno(), 0o600)
    os.fchown(tmp_bootstrap_keyring.fileno(), uid, gid)
    tmp_bootstrap_keyring.write(keyring)
    tmp_bootstrap_keyring.flush()

    # create initial monmap, tmp monmap file
    logger.info('Creating initial monmap...')
    tmp_monmap = tempfile.NamedTemporaryFile(mode='w')
    os.fchmod(tmp_monmap.fileno(), 0o644)
    out = CephContainer(
        image=args.image,
        entrypoint='/usr/bin/monmaptool',
        args=['--create',
              '--clobber',
              '--fsid', fsid,
              '--addv', mon_id, addr_arg,
              '/tmp/monmap'
        ],
        volume_mounts={
            tmp_monmap.name: '/tmp/monmap:z',
        },
    ).run()

    # create mon
    logger.info('Creating mon...')
    create_daemon_dirs(fsid, 'mon', mon_id, uid, gid)
    mon_dir = get_data_dir(fsid, 'mon', mon_id)
    log_dir = get_log_dir(fsid)
    out = CephContainer(
        image=args.image,
        entrypoint='/usr/bin/ceph-mon',
        args=['--mkfs',
              '-i', mon_id,
              '--fsid', fsid,
              '-c', '/dev/null',
              '--monmap', '/tmp/monmap',
              '--keyring', '/tmp/keyring',
        ] + get_daemon_args(fsid, 'mon', mon_id),
        volume_mounts={
            log_dir: '/var/log/ceph:z',
            mon_dir: '/var/lib/ceph/mon/ceph-%s:z' % (mon_id),
            tmp_bootstrap_keyring.name: '/tmp/keyring:z',
            tmp_monmap.name: '/tmp/monmap:z',
        },
    ).run()

    with open(mon_dir + '/config', 'w') as f:
        os.fchown(f.fileno(), uid, gid)
        os.fchmod(f.fileno(), 0o600)
        f.write(config)

    mon_c = get_container(fsid, 'mon', mon_id)
    deploy_daemon_units(fsid, uid, gid, 'mon', mon_id, mon_c)

    # client.admin key + config to issue various CLI commands
    tmp_admin_keyring = tempfile.NamedTemporaryFile(mode='w')
    os.fchmod(tmp_admin_keyring.fileno(), 0o600)
    os.fchown(tmp_admin_keyring.fileno(), uid, gid)
    tmp_admin_keyring.write('[client.admin]\n'
                      '\tkey = ' + admin_key + '\n')
    tmp_admin_keyring.flush()

    tmp_config = tempfile.NamedTemporaryFile(mode='w')
    os.fchmod(tmp_config.fileno(), 0o600)
    os.fchown(tmp_config.fileno(), uid, gid)
    tmp_config.write(config)
    tmp_config.flush()

    # a CLI helper to reduce our typing
    def cli(cmd, extra_mounts={}):
        # type: (List[str], Dict[str, str]) -> str
        mounts = {
            log_dir: '/var/log/ceph:z',
            tmp_admin_keyring.name: '/etc/ceph/ceph.client.admin.keyring:z',
            tmp_config.name: '/etc/ceph/ceph.conf:z',
        }
        for k, v in extra_mounts.items():
            mounts[k] = v
        return CephContainer(
            image=args.image,
            entrypoint='/usr/bin/ceph',
            args=cmd,
            volume_mounts=mounts,
        ).run()

    logger.info('Waiting for mon to start...')
    while True:
        c = CephContainer(
            image=args.image,
            entrypoint='/usr/bin/ceph',
            args=[
                'status'],
            volume_mounts={
                mon_dir: '/var/lib/ceph/mon/ceph-%s:z' % (mon_id),
                tmp_admin_keyring.name: '/etc/ceph/ceph.client.admin.keyring:z',
                tmp_config.name: '/etc/ceph/ceph.conf:z',
            },
        )
        out, err, ret = call(c.run_cmd(), c.entrypoint)
        if ret == 0:
            break
        logger.info('mon is still not available yet, waiting...')
        time.sleep(1)

    # assimilate and minimize config
    if not args.no_minimize_config:
        logger.info('Assimilating anything we can from ceph.conf...')
        cli([
            'config', 'assimilate-conf',
            '-i', '/var/lib/ceph/mon/ceph-%s/config' % mon_id
        ], {
            mon_dir: '/var/lib/ceph/mon/ceph-%s:z' % mon_id
        })
        logger.info('Generating new minimal ceph.conf...')
        cli([
            'config', 'generate-minimal-conf',
            '-o', '/var/lib/ceph/mon/ceph-%s/config' % mon_id
        ], {
            mon_dir: '/var/lib/ceph/mon/ceph-%s:z' % mon_id
        })
        # re-read our minimized config
        with open(mon_dir + '/config', 'r') as f:
            config = f.read()
        logger.info('Restarting the monitor...')
        call_throws([
            'systemctl',
            'restart',
            get_unit_name(fsid, 'mon', mon_id)
        ])

    # create mgr
    logger.info('Creating mgr...')
    mgr_keyring = '[mgr.%s]\n\tkey = %s\n' % (mgr_id, mgr_key)
    mgr_c = get_container(fsid, 'mgr', mgr_id)
    deploy_daemon(fsid, 'mgr', mgr_id, mgr_c, uid, gid, config, mgr_keyring)

    # crash unit
    logger.info('Creating crash agent...')
    deploy_crash(fsid, uid, gid, config,
                 '[client.crash.%s]\n\tkey = %s\n' % (hostname, crash_key))

    # output files
    with open(args.output_keyring, 'w') as f:
        os.fchmod(f.fileno(), 0o600)
        f.write('[client.admin]\n'
                '\tkey = ' + admin_key + '\n')
    logger.info('Wrote keyring to %s' % args.output_keyring)

    with open(args.output_config, 'w') as f:
        f.write(config)
    logger.info('Wrote config to %s' % args.output_config)

    logger.info('Waiting for mgr to start...')
    while True:
        out = cli(['status', '-f', 'json-pretty'])
        j = json.loads(out)
        if j.get('mgrmap', {}).get('available', False):
            break
        logger.info('mgr is still not available yet, waiting...')
        time.sleep(1)

    # ssh
    if not args.skip_ssh:
        logger.info('Enabling ssh module...')
        cli(['mgr', 'module', 'enable', 'ssh'])
        logger.info('Setting orchestrator backend to ssh...')
        cli(['orchestrator', 'set', 'backend', 'ssh'])

        logger.info('Generating ssh key...')
        cli(['ssh', 'generate-key'])
        ssh_pub = cli(['ssh', 'get-pub-key'])

        with open(args.output_pub_ssh_key, 'w') as f:
            f.write(ssh_pub)
        logger.info('Wrote public SSH key to to %s' % args.output_pub_ssh_key)

        logger.info('Adding key to root@localhost\'s authorized_keys...')
        if not os.path.exists('/root/.ssh'):
            os.mkdir('/root/.ssh', 0o700)
        auth_keys_file = '/root/.ssh/authorized_keys'
        add_newline = False
        if os.path.exists(auth_keys_file):
            with open(auth_keys_file, 'r') as f:
                f.seek(0, os.SEEK_END)
                if f.tell() > 0:
                    f.seek(f.tell()-1, os.SEEK_SET) # go to last char
                    if f.read() != '\n':
                        add_newline = True
        with open(auth_keys_file, 'a') as f:
            os.fchmod(f.fileno(), 0o600)  # just in case we created it
            if add_newline:
                f.write('\n')
            f.write(ssh_pub.strip() + '\n')

        host = get_hostname()
        logger.info('Adding host %s...' % host)
        cli(['orchestrator', 'host', 'add', host])

    if not args.skip_dashboard:
        logger.info('Enabling the dashboard module...')
        cli(['mgr', 'module', 'enable', 'dashboard'])
        logger.info('Waiting for the module to be available...')
        # FIXME: potential for an endless loop?
        while True:
            c_out = cli(['-h'])
            if 'dashboard' in c_out:
                break
            logger.info('Dashboard not yet available, waiting...')
            time.sleep(1)
        logger.info('Generating a dashboard self-signed certificate...')
        cli(['dashboard', 'create-self-signed-cert'])
        logger.info('Creating initial admin user...')
        password = args.initial_dashboard_password or generate_password()
        cli(['dashboard', 'ac-user-create',
             args.initial_dashboard_user, password,
             'administrator'])
        logger.info('Fetching dashboard port number...')
        out = cli(['config', 'get', 'mgr', 'mgr/dashboard/ssl_server_port'])
        port = int(out)
        logger.info('Ceph Dashboard is now available at:\n\n'
                    '\t     URL: https://%s:%s/\n'
                    '\t    User: %s\n'
                    '\tPassword: %s\n' % (
                        get_fqdn(), port,
                        args.initial_dashboard_user,
                        password))

    logger.info('You can access the Ceph CLI with:\n\n'
                '\tsudo %s shell --fsid %s -c %s -k %s\n' % (
                    sys.argv[0],
                    fsid,
                    args.output_config,
                    args.output_keyring))

    logger.info('Bootstrap complete.')
    return 0

##################################

def command_deploy():
    # type: () -> None
    (daemon_type, daemon_id) = args.name.split('.', 1)
    if daemon_type not in ['mon', 'mgr', 'mds', 'osd', 'rgw', 'rbd-mirror']:
        raise Error('daemon type %s not recognized' % daemon_type)
    (config, keyring, crash_keyring) = get_config_and_both_keyrings()
    if daemon_type == 'mon':
        if args.mon_ip:
            config += '[mon.%s]\n\tpublic_addr = %s\n' % (daemon_id, args.mon_ip)
        elif args.mon_addrv:
            config += '[mon.%s]\n\tpublic_addrv = %s\n' % (daemon_id,
                                                           args.mon_addrv)
        elif args.mon_network:
            config += '[mon.%s]\n\tpublic_network = %s\n' % (daemon_id,
                                                             args.mon_network)
        else:
            raise Error('must specify --mon-ip or --mon-network')
    (uid, gid) = extract_uid_gid()
    c = get_container(args.fsid, daemon_type, daemon_id)
    deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid,
                  config, keyring)
    if crash_keyring:
        deploy_crash(args.fsid, uid, gid, config, crash_keyring)

##################################

def command_run():
    # type: () -> int
    (daemon_type, daemon_id) = args.name.split('.', 1)
    c = get_container(args.fsid, daemon_type, daemon_id)
    command = c.run_cmd()
    logger.debug("Running command: %s" % ' '.join(command))
    return subprocess.call(command)

##################################

@infer_fsid
def command_shell():
    # type: () -> int
    if args.fsid:
        make_log_dir(args.fsid)
    if args.name:
        if '.' in args.name:
            (daemon_type, daemon_id) = args.name.split('.', 1)
        else:
            daemon_type = args.name
            daemon_id = None
    else:
        daemon_type = 'osd'  # get the most mounts
        daemon_id = None
    mounts = get_container_mounts(args.fsid, daemon_type, daemon_id)
    if args.config:
        mounts[pathify(args.config)] = '/etc/ceph/ceph.conf:z'
    if args.keyring:
        mounts[pathify(args.keyring)] = '/etc/ceph/ceph.keyring:z'
    container_args = ['--privileged']
    if args.command:
        command = args.command
    else:
        command = ['bash']
        container_args += [
            '-it',
            '-e', 'LANG=C',
            '-e', "PS1=%s" % CUSTOM_PS1,
        ]
    c = CephContainer(
        image=args.image,
        entrypoint='doesnotmatter',
        args=[],
        container_args=container_args,
        volume_mounts=mounts)
    command = c.shell_cmd(command)
    logger.debug("Running command: %s" % ' '.join(command))
    return subprocess.call(command)

##################################

@infer_fsid
def command_enter():
    # type: () -> int
    if not args.fsid:
        raise Error('must pass --fsid to specify cluster')
    (daemon_type, daemon_id) = args.name.split('.', 1)
    container_args = [] # type: List[str]
    if args.command:
        command = args.command
    else:
        command = ['bash']
        container_args += [
            '-it',
            '-e', 'LANG=C',
            '-e', "PS1=%s" % CUSTOM_PS1,
        ]
    c = get_container(args.fsid, daemon_type, daemon_id,
                      container_args=container_args)
    command = c.exec_cmd(command)
    logger.debug("Running command: %s" % ' '.join(command))
    return subprocess.call(command)

##################################

@infer_fsid
def command_ceph_volume():
    # type: () -> None
    if args.fsid:
        make_log_dir(args.fsid)

    mounts = get_container_mounts(args.fsid, 'osd', None)

    tmp_config = None
    tmp_keyring = None

    if args.config_and_keyring:
        # note: this will always pull from args.config_and_keyring (we
        # require it) and never args.config or args.keyring.
        (config, keyring) = get_config_and_keyring()

        # tmp keyring file
        tmp_keyring = tempfile.NamedTemporaryFile(mode='w')
        os.fchmod(tmp_keyring.fileno(), 0o600)
        tmp_keyring.write(keyring)
        tmp_keyring.flush()

        # tmp config file
        tmp_config = tempfile.NamedTemporaryFile(mode='w')
        os.fchmod(tmp_config.fileno(), 0o600)
        tmp_config.write(config)
        tmp_config.flush()

        mounts[tmp_config.name] = '/etc/ceph/ceph.conf:z'
        mounts[tmp_keyring.name] = '/var/lib/ceph/bootstrap-osd/ceph.keyring:z'

    c = CephContainer(
        image=args.image,
        entrypoint='/usr/sbin/ceph-volume',
        args=args.command,
        container_args=['--privileged'],
        volume_mounts=mounts,
    )
    out, err, code = call_throws(c.run_cmd(), verbose=True)
    if not code:
        print(out)

##################################

@infer_fsid
def command_unit():
    # type: () -> None
    if not args.fsid:
        raise Error('must pass --fsid to specify cluster')
    (daemon_type, daemon_id) = args.name.split('.', 1)
    unit_name = get_unit_name(args.fsid, daemon_type, daemon_id)
    call_throws([
        'systemctl',
        args.command,
        unit_name])

##################################

@infer_fsid
def command_logs():
    # type: () -> None
    if not args.fsid:
        raise Error('must pass --fsid to specify cluster')
    cmd = [str(container_path), 'logs'] # type: List[str]
    if args.follow:
        cmd.append('-f')
    if args.tail:
        cmd.append('--tail=' + str(args.tail))
    cmd.append('ceph-%s-%s' % (args.fsid, args.name))

    # call this directly, without our wrapper, so that we get an unmolested
    # stdout with logger prefixing.
    logger.debug("Running command: %s" % ' '.join(cmd))
    subprocess.call(cmd) # type: ignore

##################################

def command_ls():
    # type: () -> None
    ls = list_daemons(detail=not args.no_detail,
                      legacy_dir=args.legacy_dir)
    print(json.dumps(ls, indent=4))

def list_daemons(detail=True, legacy_dir=None):
    # type: (bool, Optional[str]) -> List[Dict[str, str]]
    host_version = None
    ls = []

    data_dir = args.data_dir
    if legacy_dir is not None:
        data_dir = os.path.abspath(legacy_dir + data_dir)

    # /var/lib/ceph
    if os.path.exists(data_dir):
        for i in os.listdir(data_dir):
            if i in ['mon', 'osd', 'mds', 'mgr']:
                daemon_type = i
                for j in os.listdir(os.path.join(data_dir, i)):
                    if '-' not in j:
                        continue
                    (cluster, daemon_id) = j.split('-', 1)
                    fsid = get_legacy_daemon_fsid(
                            cluster, daemon_type, daemon_id,
                            legacy_dir=legacy_dir)
                    i = {
                        'style': 'legacy',
                        'name': '%s.%s' % (daemon_type, daemon_id),
                        'fsid': fsid if fsid is not None else 'unknown',
                    }
                    if detail:
                        (i['enabled'], i['state']) = check_unit(
                            'ceph-%s@%s' % (daemon_type, daemon_id))
                        if not host_version:
                            try:
                                out, err, code = call(['ceph', '-v'])
                                if not code and out.startswith('ceph version '):
                                    host_version = out.split(' ')[2]
                            except Exception:
                                pass
                        i['host_version'] = host_version
                    ls.append(i)
            elif is_fsid(i):
                fsid = i
                for j in os.listdir(os.path.join(data_dir, i)):
                    if j == 'crash':
                        name = 'crash'
                        unit_name = 'ceph-%s-crash.service' % fsid
                    elif '.' in j:
                        name = j
                        (daemon_type, daemon_id) = j.split('.', 1)
                        unit_name = get_unit_name(fsid,
                                                  daemon_type,
                                                  daemon_id)
                    else:
                        continue
                    i = {
                        'style': 'ceph-daemon:v1',
                        'name': name,
                        'fsid': fsid,
                    }
                    if detail:
                        # get container id
                        (i['enabled'], i['state']) = check_unit(unit_name)
                        container_id = None
                        image_name = None
                        image_id = None
                        version = None
                        out, err, code = call(
                            [
                                container_path, 'inspect',
                                '--format', '{{.Id}},{{.Config.Image}},{{.Image}}',
                                'ceph-%s-%s' % (fsid, j)
                            ],
                            verbose_on_failure=False)
                        if not code:
                            (container_id, image_name, image_id) = out.strip().split(',')
                            out, err, code = call(
                                [container_path, 'exec', container_id,
                                 'ceph', '-v'])
                            if not code and out.startswith('ceph version '):
                                version = out.split(' ')[2]
                        i['container_id'] = container_id
                        i['container_image_name'] = image_name
                        i['container_image_id'] = image_id
                        i['version'] = version
                    ls.append(i)

    # /var/lib/rook
    # WRITE ME
    return ls


##################################

def command_adopt():
    # type: () -> None
    (daemon_type, daemon_id) = args.name.split('.', 1)
    (uid, gid) = extract_uid_gid()
    if args.style == 'legacy':
        fsid = get_legacy_daemon_fsid(args.cluster,
                                      daemon_type,
                                      daemon_id,
                                      legacy_dir=args.legacy_dir)
        if not fsid:
            raise Error('could not detect legacy fsid; set fsid in ceph.conf')

        # NOTE: implicit assumption here that the units correspond to the
        # cluster we are adopting based on the /etc/{defaults,sysconfig}/ceph
        # CLUSTER field.
        unit_name = 'ceph-%s@%s' % (daemon_type, daemon_id)
        (enabled, state) = check_unit(unit_name)

        if state == 'running':
            logger.info('Stopping old systemd unit %s...' % unit_name)
            call_throws(['systemctl', 'stop', unit_name])
        if enabled:
            logger.info('Disabling old systemd unit %s...' % unit_name)
            call_throws(['systemctl', 'disable', unit_name])

        # data
        logger.info('Moving data...')
        data_dir_src = ('/var/lib/ceph/%s/%s-%s' %
                        (daemon_type, args.cluster, daemon_id))
        data_dir_src = os.path.abspath(args.legacy_dir + data_dir_src)
        data_dir_dst = make_data_dir(fsid, daemon_type, daemon_id)
        for data_file in iglob(os.path.join(data_dir_src, '*')):
            move_file(data_file, data_dir_dst, uid=uid, gid=gid)
        logger.debug('Remove dir \'%s\'' % (data_dir_src))
        if os.path.ismount(data_dir_src):
            call_throws(['umount', data_dir_src])
        os.rmdir(data_dir_src)

        # config
        config_src = '/etc/ceph/%s.conf' % (args.cluster)
        config_src = os.path.abspath(args.legacy_dir + config_src)
        config_dst = os.path.join(data_dir_dst, 'config')
        copy_file(config_src, config_dst, uid=uid, gid=gid)

        # logs
        logger.info('Moving logs...')
        log_dir_src = ('/var/log/ceph/%s-%s.%s.log*' %
                        (args.cluster, daemon_type, daemon_id))
        log_dir_src = os.path.abspath(args.legacy_dir + log_dir_src)
        log_dir_dst = make_log_dir(fsid, uid=uid, gid=gid)
        for log_file in iglob(log_dir_src):
            move_file(log_file, log_dir_dst, uid=uid, gid=gid)

        logger.info('Creating new units...')
        c = get_container(fsid, daemon_type, daemon_id)
        deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c,
                            enable=True,  # unconditionally enable the new unit
                            start=(state == 'running'))
    else:
        raise Error('adoption of style %s not implemented' % args.style)

##################################

def command_rm_daemon():
    # type: () -> None
    (daemon_type, daemon_id) = args.name.split('.', 1)
    if daemon_type in ['mon', 'osd'] and not args.force:
        raise Error('must pass --force to proceed: '
                      'this command may destroy precious data!')
    unit_name = get_unit_name(args.fsid, daemon_type, daemon_id)
    call(['systemctl', 'stop', unit_name],
         verbose_on_failure=False)
    call(['systemctl', 'reset-failed', unit_name],
         verbose_on_failure=False)
    call(['systemctl', 'disable', unit_name],
         verbose_on_failure=False)
    data_dir = get_data_dir(args.fsid, daemon_type, daemon_id)
    call_throws(['rm', '-rf', data_dir])

##################################

def command_rm_cluster():
    # type: () -> None
    if not args.force:
        raise Error('must pass --force to proceed: '
                      'this command may destroy precious data!')

    # stop + disable individual daemon units
    for d in list_daemons(detail=False):
        if d['fsid'] != args.fsid:
            continue
        if d['style'] != 'ceph-daemon:v1':
            continue
        unit_name = get_unit_name(args.fsid, d['name'])
        call(['systemctl', 'stop', unit_name],
             verbose_on_failure=False)
        call(['systemctl', 'reset-failed', unit_name],
             verbose_on_failure=False)
        call(['systemctl', 'disable', unit_name],
             verbose_on_failure=False)

    # cluster units
    for unit_name in ['ceph-%s.target' % args.fsid,
                      'ceph-%s-crash.service' % args.fsid]:
        call(['systemctl', 'stop', unit_name],
             verbose_on_failure=False)
        call(['systemctl', 'reset-failed', unit_name],
             verbose_on_failure=False)
        call(['systemctl', 'disable', unit_name],
             verbose_on_failure=False)

    slice_name = 'system-%s.slice' % (('ceph-%s' % args.fsid).replace('-',
                                                                      '\\x2d'))
    call(['systemctl', 'stop', slice_name],
         verbose_on_failure=False)

    # rm units
    call_throws(['rm', '-f', args.unit_dir +
                             '/ceph-%s@.service' % args.fsid])
    call_throws(['rm', '-f', args.unit_dir +
                             '/ceph-%s-crash.service' % args.fsid])
    call_throws(['rm', '-f', args.unit_dir +
                             '/ceph-%s.target' % args.fsid])
    call_throws(['rm', '-rf',
                  args.unit_dir + '/ceph-%s.target.wants' % args.fsid])
    # rm data
    call_throws(['rm', '-rf', args.data_dir + '/' + args.fsid])
    # rm logs
    call_throws(['rm', '-rf', args.log_dir + '/' + args.fsid])
    call_throws(['rm', '-rf', args.log_dir +
                             '/*.wants/ceph-%s@*' % args.fsid])
    # rm logrotate config
    call_throws(['rm', '-f', args.logrotate_dir + '/ceph-%s' % args.fsid])


##################################

def _get_parser():
    # type: () -> argparse.ArgumentParser
    parser = argparse.ArgumentParser(
        description='Bootstrap Ceph daemons with systemd and containers.',
        formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument(
        '--image',
        default=DEFAULT_IMAGE,
        help='container image')
    parser.add_argument(
        '--docker',
        action='store_true',
        help='use docker instead of podman')
    parser.add_argument(
        '--data-dir',
        default=DATA_DIR,
        help='base directory for daemon data')
    parser.add_argument(
        '--log-dir',
        default=LOG_DIR,
        help='base directory for daemon logs')
    parser.add_argument(
        '--logrotate-dir',
        default=LOGROTATE_DIR,
        help='location of logrotate configuration files')
    parser.add_argument(
        '--unit-dir',
        default=UNIT_DIR,
        help='base directory for systemd units')
    parser.add_argument(
        '--verbose', '-v',
        action='store_true',
        help='Show debug-level log messages')
    subparsers = parser.add_subparsers(help='sub-command')

    parser_version = subparsers.add_parser(
        'version', help='get ceph version from container')
    parser_version.set_defaults(func=command_version)

    parser_ls = subparsers.add_parser(
        'ls', help='list daemon instances on this host')
    parser_ls.set_defaults(func=command_ls)
    parser_ls.add_argument(
        '--no-detail',
        action='store_true',
        help='Do not include daemon status')
    parser_ls.add_argument(
        '--legacy-dir',
        default='/',
        help='base directory for legacy daemon data')

    parser_adopt = subparsers.add_parser(
        'adopt', help='adopt daemon deployed with a different tool')
    parser_adopt.set_defaults(func=command_adopt)
    parser_adopt.add_argument(
        '--name', '-n',
        required=True,
        help='daemon name (type.id)')
    parser_adopt.add_argument(
        '--style',
        required=True,
        help='deployment style (legacy, ...)')
    parser_adopt.add_argument(
        '--cluster',
        default='ceph',
        help='cluster name')
    parser_adopt.add_argument(
        '--legacy-dir',
        default='/',
        help='base directory for legacy daemon data')

    parser_rm_daemon = subparsers.add_parser(
        'rm-daemon', help='remove daemon instance')
    parser_rm_daemon.set_defaults(func=command_rm_daemon)
    parser_rm_daemon.add_argument(
        '--name', '-n',
        required=True,
        help='daemon name (type.id)')
    parser_rm_daemon.add_argument(
        '--fsid',
        required=True,
        help='cluster FSID')
    parser_rm_daemon.add_argument(
        '--force',
        action='store_true',
        help='proceed, even though this may destroy valuable data')

    parser_rm_cluster = subparsers.add_parser(
        'rm-cluster', help='remove all daemons for a cluster')
    parser_rm_cluster.set_defaults(func=command_rm_cluster)
    parser_rm_cluster.add_argument(
        '--fsid',
        required=True,
        help='cluster FSID')
    parser_rm_cluster.add_argument(
        '--force',
        action='store_true',
        help='proceed, even though this may destroy valuable data')

    parser_run = subparsers.add_parser(
        'run', help='run a ceph daemon, in a container, in the foreground')
    parser_run.set_defaults(func=command_run)
    parser_run.add_argument(
        '--name', '-n',
        required=True,
        help='daemon name (type.id)')
    parser_run.add_argument(
        '--fsid',
        required=True,
        help='cluster FSID')

    parser_shell = subparsers.add_parser(
        'shell', help='run an interactive shell inside a daemon container')
    parser_shell.set_defaults(func=command_shell)
    parser_shell.add_argument(
        '--fsid',
        help='cluster FSID')
    parser_shell.add_argument(
        '--name', '-n',
        help='daemon name (type.id)')
    parser_shell.add_argument(
        '--config', '-c',
        help='ceph.conf to pass through to the container')
    parser_shell.add_argument(
        '--keyring', '-k',
        help='ceph.keyring to pass through to the container')
    parser_shell.add_argument(
        'command', nargs='*',
        help='command (optional)')

    parser_enter = subparsers.add_parser(
        'enter', help='run an interactive shell inside a running daemon container')
    parser_enter.set_defaults(func=command_enter)
    parser_enter.add_argument(
        '--fsid',
        help='cluster FSID')
    parser_enter.add_argument(
        '--name', '-n',
        required=True,
        help='daemon name (type.id)')
    parser_enter.add_argument(
        'command', nargs='*',
        help='command')

    parser_ceph_volume = subparsers.add_parser(
        'ceph-volume', help='run ceph-volume inside a container')
    parser_ceph_volume.set_defaults(func=command_ceph_volume)
    parser_ceph_volume.add_argument(
        '--fsid',
        help='cluster FSID')
    parser_ceph_volume.add_argument(
        '--config-and-keyring',
        help='JSON file with config and (client.bootrap-osd) key')
    parser_ceph_volume.add_argument(
        'command', nargs='+',
        help='command')

    parser_unit = subparsers.add_parser(
        'unit', help='operate on the daemon\'s systemd unit')
    parser_unit.set_defaults(func=command_unit)
    parser_unit.add_argument(
        'command',
        help='systemd command (start, stop, restart, enable, disable, ...)')
    parser_unit.add_argument(
        '--fsid',
        help='cluster FSID')
    parser_unit.add_argument(
        '--name', '-n',
        required=True,
        help='daemon name (type.id)')

    parser_logs = subparsers.add_parser(
        'logs', help='fetch the log for a daemon\'s container')
    parser_logs.set_defaults(func=command_logs)
    parser_logs.add_argument(
        '--fsid',
        help='cluster FSID')
    parser_logs.add_argument(
        '--name', '-n',
        required=True,
        help='daemon name (type.id)')
    parser_logs.add_argument(
        '-f', '--follow',
        action='store_true',
        help='Follow log output')
    parser_logs.add_argument(
        '--tail',
        help='Output the specified number of lines at the end of the log')

    parser_bootstrap = subparsers.add_parser(
        'bootstrap', help='bootstrap a cluster (mon + mgr daemons)')
    parser_bootstrap.set_defaults(func=command_bootstrap)
    parser_bootstrap.add_argument(
        '--config', '-c',
        help='ceph conf file to incorporate')
    parser_bootstrap.add_argument(
        '--mon-id',
        required=False,
        help='mon id (default: local hostname)')
    parser_bootstrap.add_argument(
        '--mon-addrv',
        help='mon IPs (e.g., [v2:localipaddr:3300,v1:localipaddr:6789])')
    parser_bootstrap.add_argument(
        '--mon-ip',
        help='mon IP')
    parser_bootstrap.add_argument(
        '--mgr-id',
        required=False,
        help='mgr id (default: local hostname)')
    parser_bootstrap.add_argument(
        '--fsid',
        help='cluster FSID')
    parser_bootstrap.add_argument(
        '--output-keyring',
        default='ceph.client.admin.keyring',
        help='location to write keyring file with new cluster admin and mon keys')
    parser_bootstrap.add_argument(
        '--output-config',
        default='ceph.conf',
        help='location to write conf file to connect to new cluster')
    parser_bootstrap.add_argument(
        '--output-pub-ssh-key',
        default='ceph.pub',
        help='location to write the cluster\'s public SSH key')
    parser_bootstrap.add_argument(
        '--skip-ssh',
        action='store_true',
        help='skip setup of ssh key on local host')
    parser_bootstrap.add_argument(
        '--initial-dashboard-user',
        default='admin',
        help='Initial user for the dashboard')
    parser_bootstrap.add_argument(
        '--initial-dashboard-password',
        help='Initial password for the initial dashboard user')
    parser_bootstrap.add_argument(
        '--skip-dashboard',
        action='store_true',
        help='do not enable the Ceph Dashboard')
    parser_bootstrap.add_argument(
        '--no-minimize-config',
        action='store_true',
        help='do not assimilate and minimize the config file')
    parser_bootstrap.add_argument(
        '--skip-ping-check',
        action='store_true',
        help='do not verify that mon IP is pingable')
    parser_bootstrap.add_argument(
        '--skip-pull',
        action='store_true',
        help='do not pull the latest image before bootstrapping')
    parser_bootstrap.add_argument(
        '--allow-overwrite',
        action='store_true',
        help='allow overwrite of existing --output-* config/keyring/ssh files')

    parser_deploy = subparsers.add_parser(
    'deploy', help='deploy a daemon')
    parser_deploy.set_defaults(func=command_deploy)
    parser_deploy.add_argument(
        '--name',
        required=True,
        help='daemon name (type.id)')
    parser_deploy.add_argument(
        '--fsid',
        required=True,
        help='cluster FSID')
    parser_deploy.add_argument(
        '--config', '-c',
        help='config file for new daemon')
    parser_deploy.add_argument(
        '--keyring',
        help='keyring for new daemon')
    parser_deploy.add_argument(
        '--crash-keyring',
        help='crash keyring for crash agent daemon')
    parser_deploy.add_argument(
        '--key',
        help='key for new daemon')
    parser_deploy.add_argument(
        '--config-and-keyrings',
        help='JSON file with config and keyrings for the daemon and crash agent')
    parser_deploy.add_argument(
        '--mon-ip',
        help='mon IP')
    parser_deploy.add_argument(
        '--mon-addrv',
        help='mon IPs (e.g., [v2:localipaddr:3300,v1:localipaddr:6789])')
    parser_deploy.add_argument(
        '--mon-network',
        help='mon network (CIDR)')
    parser_deploy.add_argument(
        '--osd-fsid',
        help='OSD uuid, if creating an OSD container')

    return parser


if __name__ == "__main__":
    # allow argv to be injected
    try:
        av = injected_argv # type: ignore
    except NameError:
        av = sys.argv[1:]
    parser = _get_parser()
    args = parser.parse_args(av)

    if args.verbose:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger('ceph-daemon')

    # root?
    if os.geteuid() != 0:
        sys.stderr.write('ERROR: ceph-daemon should be run as root\n')
        sys.exit(1)

    # podman or docker?
    if args.docker:
        container_path = find_program('docker')
    else:
        for i in CONTAINER_PREFERENCE:
            try:
                container_path = find_program(i)
                break
            except Exception as e:
                logger.debug('Could not locate %s: %s' % (i, e))
        if not container_path:
            sys.stderr.write('Unable to locate any of %s\n' % CONTAINER_PREFERENCE)
            sys.exit(1)

    if 'func' not in args:
        sys.stderr.write('No command specified; pass -h or --help for usage\n')
        sys.exit(1)
    try:
        r = args.func()
    except Error as e:
        if args.verbose:
            raise
        sys.stderr.write('ERROR: %s\n' % e)
        sys.exit(1)
    if not r:
        r = 0
    sys.exit(r)
